-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Iceberg positional deletes #5897
Conversation
✅ Deploy Preview for meta-velox canceled.
|
ecba7d1
to
88037c6
Compare
This is the first cut to support reading Iceberg tables with delete files. The design doc can be found at facebookincubator#5977
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 Some high-level comments while we converge on the overall design.
auto& deleteFiles = icebergSplit->deleteFiles; | ||
for (auto& deleteFile : deleteFiles) { | ||
if (deleteFile.content == FileContent::kEqualityDeletes) { | ||
readEqualityDeletes(deleteFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move equality deletes to another PR.
@@ -20,5 +20,6 @@ target_link_libraries( | |||
velox_gcs | |||
velox_core | |||
velox_hive_connector | |||
velox_hive_iceberg_datasource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These CMakeLists.txt additions should be redundant since velox_hive_iceberg_datasource
is added to velox_hive_connector
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@majetideepak It didn't work and would throw link errors. Do you know how to make velox_hive_connector contain the velox_hive_iceberg_datasource? I tried to add PUBLIC link lib to velox_hive_iceberg_datasource, but it also doesn't work.
_length, | ||
_partitionKeys, | ||
_tableBucketNumber) { | ||
// TODO: Deserialize _extraFileInfo to get deleteFiles; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finish TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finish TODO.
The delete file serialization is not done on the Presto_cpp yet, so I plan to do this in future PR after we decide how to serialize the delete files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the open questions related to delete file serialization ? It might be better to finalize that to make this PR seem complete.
|
||
firstRowInSplit_ = rowReader_->nextRowNumber(); | ||
|
||
// TODO: Deserialize the std::vector<IcebergDeleteFile> deleteFiles. For now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finish TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 We probably want to create some DeltaProcessor
interface class like this and call it in different parts of HiveDataSource
, instead of subclassing HiveDataSource
, because the split is not guaranteed to be the same type during the life cycle of the data source. At end of the day, DeltaProcessor
is the property of split, not of data source.
class DeltaProcessor {
// Can delegate to a factory registry to further decouple different implementations.
static std::unique_ptr<DeltaProcessor> create(const HiveConnectorSplit&);
// Called in HiveDataSource::addSplit to prepare delta readers.
void prepareSplit();
void readNext(RowReader&, int64_t size, VectorPtr& output);
// In case delta readers need to be aware of filters.
void addDynamicFilter();
// Flush the data in delta reader out (e.g., in case we split update into delete + append, the newly appended rows need to be flushed here).
void finishSplit();
};
Let me know how do you think. I can create some skeleton about this class and implement it for basic case (no delta).
namespace facebook::velox::connector::hive::iceberg { | ||
|
||
struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { | ||
std::vector<IcebergDeleteFile> deleteFiles; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you parsing the delete file information in presto worker? Why not just parse it inside HiveIcebergDataSource
in Velox? This way you do not need a specific split type for iceberg, because HiveConnectorSplit
should already have all the information you need.
: id(_id), name(_name), type(_type), doc(_doc) {} | ||
}; | ||
|
||
#define ICEBERG_DELETE_FILE_PATH_COLUMN() \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why macros?
VELOX_NYI(); | ||
} | ||
|
||
void HiveIcebergDataSource::openDeletes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot load all delete information in memory. You need to keep the delta readers here (I wrap it in some local class like DeltaDeleteReader
/DeltaUpdateReader
to encapsulate the logic). You can also keep the data sources but keep readers directly will make future optimization easier (e.g. overlapping loading of base file with decoding of delete files).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yuhta I intended to do this in future PRs. Do we have to do it now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will have a rather big impact on how you structure the thing, so better to do it right now, otherwise most of what you write now need to be rewritten later.
@Yuhta Thanks for brainstorming. Could you give me an example when this is going to happen? |
@yingsu00 For now this is not switch between two different frameworks, but could happen that the first split is a normal split without deltas but the subsequent one is a split with deltas. So process it at per split level looks the right thing to me. |
@@ -41,7 +41,7 @@ template <typename T> | |||
VectorPtr createScalar( | |||
size_t size, | |||
std::mt19937& gen, | |||
std::function<T()> val, | |||
std::function<T(size_t)> val, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 : BatchMaker is deprecated. Can you use VectorMaker instead ?
This is fine. But the splits for reading an Iceberg table will always come in with extraFileInfo["table_format"]="iceberg" |
This is not guaranteed for future table formats |
But Iceberg splits will always come in with this, similar to Hudi. We can guarantee that and their datasource can be constructed accordingly. |
@yingsu00 We are design the framework for all these table formats, so dispatch at split level is the right thing here |
But you can't mix everything together. These table formats spec are very different to each other and they are quite complex. THere has to be some abstractions, layers and separation of concerns. And how was the decision made? |
Superseded by #7362 |
This PR is the first cut to support reading Iceberg tables with positional delete files.
Design doc: #5977
TODO:
Catches: